from pyspark.sql import SparkSession

"""
@version:
@software: PyCharm
@file: 初级入门pyspark创建rdd，及pyspark读取文本文件，json文件，csv文件，及hdfs上的文件
        还有RDD与DataFrame之间相互装换
@time: 
"""

spark = SparkSession.builder.master(master="local").appName("test").getOrCreate()

# 测试
# spark = SparkSession.builder.master(master="local").appName("test").getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.count())  # 值为5

########################################
# 创建rdd
# spark = SparkSession.builder.master(master="local").appName("test").getOrCreate()
# sc = spark.sparkContext
# wordaList = ["cat", "elephant", "ret", "rat", "cat"]
# wordaRDD = sc.parallelize(wordaList)
# print(type(wordaRDD))  # 类型为rdd类型
# print(wordaRDD.take(5))  # 列出wordaRDD对象中的的前五个项目
# print(wordaRDD.collect())  # 列出wordaRDD对象中的的所有项目
# print(wordaRDD.collect())  # 列出wordaRDD对象中的的所有项目

###########################################

# 从hdfs上读取文件
# textFromHDFS = spark.read.text("hdfs:///tmp/aaaa.txt")
# textFromHDFS = spark.read.text("E:\\Python\\pyspark_demo01\\pyspark_data\\NASA_access_log_Jul95_100")
# print("查看第一条数据:{}".format(textFromHDFS.head()))  # 查看第一条数据
# print("查看前五条数据:{}".format(textFromHDFS.head(5)))  # 查看前五条数据
# print("查看全部数据:{}".format(textFromHDFS.show()))  # 查看全部数据

###########################################

# 从csv中读取数据
# path = "E:\\Python\\pyspark_demo01\\pyspark_data\\ratings.csv"  # 文件存储的路径
# schema = None
# sep = ","  # 分割符
# header = True
# csvDF = spark.read.csv(path=path, schema=schema, sep=sep, header=header)
# print("类型：{}".format(type(csvDF)))
# print("csv第一行的数据类型：{}".format(csvDF))#结果DataFrame[userid: string, movieid: string, rating: string, ts: string]
# print("查看第一条数据：{}".format(csvDF.head()))
# print("查看所有数据：{}".format(csvDF.show()))

###########################################

# 读取文本文件
# path = "E:\\Python\\pyspark_demo01\\pyspark_data\\kmeans_data.txt"
# textDF = spark.read.text(paths=path)
# print("查看第一条数据：{}".format(textDF.head()))
# print("查看所有数据：{}".format(textDF.show()))

###########################################

# 读取json数据
# jsonDF = spark.read.json("E:\\Python\\pyspark_demo01\\pyspark_data\\json_example.json")  # 类型为DataFrame
# print("查看第一条数据：{}".format(jsonDF.head()))
# # print("查看所有数据：{}".format(jsonDF.show()))
# # RDD与DataFrame的转换
# jsonRDD = jsonDF.rdd  # DataFrame转为RDD类型
# print("装换为RDD类型：{}".format(type(jsonRDD)))  # 装换为RDD类型：<class 'pyspark.rdd.RDD'>
# # 转换个时候不能通过head和show查看，可以通过take[]来查看数据
# print("RDD类型查看数据:{}".format(jsonRDD.take(4)))
#
# jsonDF2 = spark.createDataFrame(jsonRDD) #也可以将RDD类型装换为DataFrame

###########################################
